RxJava2-map

介绍

操作符map,字面理解一下,就是映射,那么这个操作符如何使用呢?

举例说明

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Logger("Emit 1");
e.onNext(1);
Logger("Emit 2");
e.onNext(2);
e.onComplete();
}
});

Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Logger("onSubscribe");
}

@Override
public void onNext(String object) {
Logger(" onNext " + object);
}

@Override
public void onError(Throwable e) {
Logger("onError e = " + e.getMessage());
}

@Override
public void onComplete() {
Logger("onComplete");
}
};
observable.subscribeOn(Schedulers.io())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "i am " + integer;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);

运行结果

这里写图片描述

可以看到首先执行的是onSubscribe方法,然后发射两个onNext事件,两个Integer参数经过转换之后成为String类型,然后传递给Observer作出动作,这就是Map的作用。

源码浅析

下面是map操作符的源码。

1
2
3
4
5
6
7
//观察map中Function的两个参数,一个是T也就是变化前的类型,R为变化后的类型,最后返回的是R类型的Observerable(姑且这么说)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
//常规非空检查
ObjectHelper.requireNonNull(mapper, "mapper is null");
//又是onAssembly,不重要,关注ObservableMap
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

看下ObservableMap的构造方法,传入的参数,source为转换前的Observable< T >, 第二个参数就是我们得function。
ObservableMap继承AbstractObservableWithUpstream,而后者继承Observable,可以看出来,经过map转换后的Observable对象就是ObservableMap类型。

1
2
3
4
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}

既然转换后是ObservableMap类型,那么必然存在subscribeActual方法,果然

1
2
3
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}

看看MapObserver是个什么东西?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//MapObserver继承BasicFuseableObserver,而后者实现了Observer接口
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
//map操作符中的function
final Function<? super T, ? extends U> mapper;
//actual是栗子中我们后面subscribe的Observer
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}

@Override
public void onNext(T t) {
//done在执行了onError或者onComplete之后为true,具体查看父类BasicFuseableObserver。done为true时,不做处理。
if (done) {
return;
}

if (sourceMode != NONE) {
actual.onNext(null);
return;
}

//U类型,转换后的类型,栗子中为String
U v;

try {
//常规非空检查,但是里面有个apply的操作,这个apply的操作就是我们上面栗子中把Integer转成String的操作。v就是转换后的String类型
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//Observer执行onNext操作,v为转换后的类型,如栗子中的String
actual.onNext(v);
}

@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}

为什么发射之后Observer的onNext没有立刻执行,因为上文例子中subscribeOn和observeOn所在的线程不一样。

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×